package com.parkingwang.learning.operators;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.observables.GroupedObservable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class TransformOperator {


    public static void main(String[] args) {

//        mapOper();


//        flatMapOper();


//        groupByOper();

//        bufferOper();

//        windowOper();


//        firstOper();

//        take();

        takeTime();


    }


    private static void takeTime() {
        Observable.intervalRange(1,9,0,1,TimeUnit.SECONDS)
                .take(5,TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        System.out.println(aLong);
                    }
                });

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 只发送前面的N条数据，发射完直接complete
     * 当take数量大于原始发射数量并不会抛异常和走error方法
     *
     * 另takeLast()方法同理，取最后几个数据
     *
     */
    private static void take() {
//        Observable.interval(0,1,TimeUnit.SECONDS)
//                .take(5)
//                .subscribe(new Consumer<Long>() {
//                    @Override
//                    public void accept(Long aLong) throws Exception {
//                        System.out.println(aLong);
//                    }
//                }, new Consumer<Throwable>() {
//                    @Override
//                    public void accept(Throwable throwable) throws Exception {
//
//                    }
//                }, new Action() {
//                    @Override
//                    public void run() throws Exception {
//                        System.out.println("complete");
//                    }
//                });
//
//        try {
//            Thread.sleep(8000);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }

        Observable.range(1,3)
                .take(5)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        System.out.println(throwable);
                    }
                });


    }

    /**
     * 只取第一个数据，如果没有可以使用默认值
     * firstOnError  要么取到第一个数据，要么执行error
     * firstElement　没有默认值
     *
     * last  取最后一个数值，方法相似
     */
    private static void firstOper() {
        Observable.<Integer>empty()
//                .first(10)
                .firstElement()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });
    }

    /**
     * 跟buffer差不多，打包形式变为Observable
     */
    private static void windowOper() {
        Observable.range(1, 10)
                .window(3)
                .subscribe(new Consumer<Observable<Integer>>() {
                    @Override
                    public void accept(Observable<Integer> integerObservable) throws Exception {
                        System.out.println("-----");
                        integerObservable.subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer integer) throws Exception {
                                System.out.println(integer);
                            }
                        });
                    }
                });
    }


    /**
     * buffer(N):将数据流包装成以N个数据为一个包的形式发送出去
     * buffer(N,M):将数据流包装成以N个数据为一个包的形式发送出去,每次发送包的发送初始值总会是上一包初始值偏移M
     */
    private static void bufferOper() {
//        Observable.range(1,10)
//                .buffer(3)
//                .subscribe(new Consumer<List<Integer>>() {
//                    @Override
//                    public void accept(List<Integer> integers) throws Exception {
//                        System.out.println(integers);
//                    }
//                });

//        Observable.range(1,10)
//                .buffer(5,2)
//                .subscribe(new Consumer<List<Integer>>() {
//                    @Override
//                    public void accept(List<Integer> integers) throws Exception {
//                        System.out.println(integers);
//                    }
//                });


        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onError(new Throwable("error"));
                emitter.onNext(5);
                emitter.onNext(6);
                emitter.onNext(7);
                emitter.onNext(8);
            }
        });

        observable.buffer(3)
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        System.out.println(integers);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        System.out.println(throwable.toString());
                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {
                        System.out.println("complete");
                    }
                });


    }

    private static void groupByOper() {
        Observable.range(1, 10)
                .groupBy(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {
                        return integer % 2 == 0 ? "偶数组" : "奇数组";
                    }
                })
                .subscribe(new Consumer<GroupedObservable<String, Integer>>() {
                    @Override
                    public void accept(GroupedObservable<String, Integer> siGroup) throws Exception {
                        if (siGroup.getKey().equals("偶数组")) {
                            siGroup.subscribe(new Consumer<Integer>() {
                                @Override
                                public void accept(Integer integer) throws Exception {
                                    System.out.println(integer);
                                }
                            });
                        }
                    }
                });
    }

    /**
     * 将Model里的List单独抽取出来,再逐个发送
     * 数据发射可能交错，按顺序可以采用concatMap
     */
    private static void flatMapOper() {
        User user = new User();
        user.name = "小明";
        ArrayList<User.Address> list = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            list.add(new User.Address("小明", "地址" + i));
        }
        user.addressList = list;


//        Observable.just(user)
//                .concatMap(new Function<User, ObservableSource<User.Address>>() {
//                    @Override
//                    public ObservableSource<User.Address> apply(User user) throws Exception {
//                        return Observable.fromIterable(user.addressList);
//                    }
//                })
//                .subscribe(new Consumer<User.Address>() {
//                    @Override
//                    public void accept(User.Address address) throws Exception {
//                        System.out.println(address.city+"--"+address.street);
//                    }
//                });

        Observable.just(user)
                .concatMapIterable(new Function<User, Iterable<User.Address>>() {
                    @Override
                    public Iterable<User.Address> apply(User user) throws Exception {
                        return user.addressList;
                    }
                })
                .subscribe(new Consumer<User.Address>() {
                    @Override
                    public void accept(User.Address address) throws Exception {
                        System.out.println(address.city + "--" + address.street);
                    }
                });

    }


    static class User {

        public String name;

        public List<Address> addressList;

        static class Address {

            public Address(String city, String street) {
                this.city = city;
                this.street = street;
            }

            public String city;
            public String street;

            @Override
            public String toString() {
                return "Address{" +
                        "city='" + city + '\'' +
                        ", street='" + street + '\'' +
                        '}';
            }
        }

        @Override
        public String toString() {
            return "User{" +
                    "name='" + name + '\'' +
                    ", addressList=" + addressList +
                    '}';
        }
    }


    private static void mapOper() {
        Observable.just(1, 2, 3)
                .map(new Function<Integer, Person>() {
                    @Override
                    public Person apply(Integer integer) throws Exception {
                        return new Person(integer, "天字" + integer + "号");
                    }
                })
                .subscribe(new Consumer<Person>() {
                    @Override
                    public void accept(Person person) throws Exception {
                        System.out.println(person);
                    }
                });
    }


    static class Person {

        public Person(int id, String name) {
            this.id = id;
            this.name = name;
        }

        public int id;
        public String name;

        @Override
        public String toString() {
            return "Person{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    '}';
        }
    }


}
